package com.atguigu.flink0624.chapter07.state;

import com.atguigu.flink0624.bean.WaterSensor;
import com.atguigu.flink0624.util.AtguiguUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.Comparator;
import java.util.List;

/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2021/11/15 15:25
 */
public class Flink04_KeyedState_Value {
    
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 2000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);
        
        // 返回值就是每条数据的事件时间
        env
            .socketTextStream("hadoop162", 9999)
            .map(line -> {
                String[] data = line.split(",");
                return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.valueOf(data[2]));
            })
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                    .withTimestampAssigner((element, ts) -> element.getTs())
            )
            .keyBy(WaterSensor::getId)
            .process(new KeyedProcessFunction<String, WaterSensor, String>() {
                
                private ListState<Integer> top3VcState;
                
                @Override
                public void open(Configuration parameters) throws Exception {
                    
                    top3VcState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("top3VcState", Integer.class));
                }
                
                @Override
                public void processElement(WaterSensor value,
                                           Context ctx,
                                           Collector<String> out) throws Exception {
                    top3VcState.add(value.getVc());
                    
                    List<Integer> list = AtguiguUtil.toList(top3VcState.get());
                    
                    //                    list.sort((o1, o2) -> o2.compareTo(o1));
                    list.sort(Comparator.reverseOrder());  // 原地排序
                    
                    if (list.size() == 4) {
                        list.remove(list.size() - 1);
                    }
                    
                    top3VcState.update(list);
                    
                    out.collect(ctx.getCurrentKey() + "    " + list);
                }
            })
            .print();
        
        env.execute();
        
    }
}
